from celery import Task


class HookTask(Task):
    """
    任务钩子： 监控任务的执行
    """

    # 成功时执行
    def on_success(self, ret_val, task_id, args, kwargs):
        print('ret_val', ret_val)
        print(f'task id:{task_id} , arg:{args} , successful !')

    # 失败时执行
    def on_failure(self, exc, task_id, args, kwargs, error_info):
        print(f'task id:{task_id} , arg:{args} , failed ! errors: {exc}')

    # 任务重试时执行
    def on_retry(self, exc, task_id, args, kwargs, error_info):
        print(f'task id:{task_id} , arg:{args} , retry !  errors: {exc}')
